/**
 * Copyright (c) [Year] [name of copyright holder]
 *    [Software Name] is licensed under Mulan PSL v2.
 *    You can use this software according to the terms and conditions of the Mulan PSL v2.
 *    You may obtain a copy of Mulan PSL v2 at:
 *             http://license.coscl.org.cn/MulanPSL2
 *    THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 *    See the Mulan PSL v2 for more details.
 *
 *
 *                      Mulan Permissive Software License，Version 2
 *
 *    Mulan Permissive Software License，Version 2 (Mulan PSL v2)
 *    January 2020 http://license.coscl.org.cn/MulanPSL2
 *
 *    Your reproduction, use, modification and distribution of the Software shall be subject to Mulan PSL v2 (this License) with the following terms and conditions:
 *
 *    0. Definition
 *
 *       Software means the program and related documents which are licensed under this License and comprise all Contribution(s).
 *
 *       Contribution means the copyrightable work licensed by a particular Contributor under this License.
 *
 *       Contributor means the Individual or Legal Entity who licenses its copyrightable work under this License.
 *
 *       Legal Entity means the entity making a Contribution and all its Affiliates.
 *
 *       Affiliates means entities that control, are controlled by, or are under common control with the acting entity under this License, ‘control’ means direct or indirect ownership of at least fifty percent (50%) of the voting power, capital or other securities of controlled or commonly controlled entity.
 *
 *    1. Grant of Copyright License
 *
 *       Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable copyright license to reproduce, use, modify, or distribute its Contribution, with modification or not.
 *
 *    2. Grant of Patent License
 *
 *       Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable (except for revocation under this Section) patent license to make, have made, use, offer for sale, sell, import or otherwise transfer its Contribution, where such patent license is only limited to the patent claims owned or controlled by such Contributor now or in future which will be necessarily infringed by its Contribution alone, or by combination of the Contribution with the Software to which the Contribution was contributed. The patent license shall not apply to any modification of the Contribution, and any other combination which includes the Contribution. If you or your Affiliates directly or indirectly institute patent litigation (including a cross claim or counterclaim in a litigation) or other patent enforcement activities against any individual or entity by alleging that the Software or any Contribution in it infringes patents, then any patent license granted to you under this License for the Software shall terminate as of the date such litigation or activity is filed or taken.
 *
 *    3. No Trademark License
 *
 *       No trademark license is granted to use the trade names, trademarks, service marks, or product names of Contributor, except as required to fulfill notice requirements in Section 4.
 *
 *    4. Distribution Restriction
 *
 *       You may distribute the Software in any medium with or without modification, whether in source or executable forms, provided that you provide recipients with a copy of this License and retain copyright, patent, trademark and disclaimer statements in the Software.
 *
 *    5. Disclaimer of Warranty and Limitation of Liability
 *
 *       THE SOFTWARE AND CONTRIBUTION IN IT ARE PROVIDED WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED. IN NO EVENT SHALL ANY CONTRIBUTOR OR COPYRIGHT HOLDER BE LIABLE TO YOU FOR ANY DAMAGES, INCLUDING, BUT NOT LIMITED TO ANY DIRECT, OR INDIRECT, SPECIAL OR CONSEQUENTIAL DAMAGES ARISING FROM YOUR USE OR INABILITY TO USE THE SOFTWARE OR THE CONTRIBUTION IN IT, NO MATTER HOW IT’S CAUSED OR BASED ON WHICH LEGAL THEORY, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
 *
 *    6. Language
 *
 *       THIS LICENSE IS WRITTEN IN BOTH CHINESE AND ENGLISH, AND THE CHINESE VERSION AND ENGLISH VERSION SHALL HAVE THE SAME LEGAL EFFECT. IN THE CASE OF DIVERGENCE BETWEEN THE CHINESE AND ENGLISH VERSIONS, THE CHINESE VERSION SHALL PREVAIL.
 *
 *    END OF THE TERMS AND CONDITIONS
 *
 *    How to Apply the Mulan Permissive Software License，Version 2 (Mulan PSL v2) to Your Software
 *
 *       To apply the Mulan PSL v2 to your work, for easy identification by recipients, you are suggested to complete following three steps:
 *
 *       i Fill in the blanks in following statement, including insert your software name, the year of the first publication of your software, and your name identified as the copyright owner;
 *
 *       ii Create a file named “LICENSE” which contains the whole context of this License in the first directory of your software package;
 *
 *       iii Attach the statement to the appropriate annotated syntax at the beginning of each source file.
 *
 *
 *    Copyright (c) [Year] [name of copyright holder]
 *    [Software Name] is licensed under Mulan PSL v2.
 *    You can use this software according to the terms and conditions of the Mulan PSL v2.
 *    You may obtain a copy of Mulan PSL v2 at:
 *                http://license.coscl.org.cn/MulanPSL2
 *    THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 *    See the Mulan PSL v2 for more details.
 */
package per.sykes.ons.producer.normal;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.OrderProducerBean;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.shade.com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneOffset;

/**
 * ProducerMessage
 * 消息发送工具
 * @author sykes
 * @date 2020-10-10 16:20
 */
public class ProducerMessage {

    private final Logger log = LoggerFactory.getLogger(ProducerMessage.class);

    @Resource
    private ProducerBean producer;

    @Resource
    private OrderProducerBean orderProducer;

    /**
     * 发送顺序消息 发送失败-发送结果为 null
     *
     * @param topic       主题
     * @param tag         消息过滤标签
     * @param shardingKey 全局顺序消息，该字段可以设置为任意非空字符串。
     * @param key         代表消息的业务关键属性，请尽可能全局唯一。。
     * @param object      消息内容
     * @return 发送结果
     */
    public SendResult sendOrderMessage(String topic, String tag, String shardingKey, String key, Object object) {
        Message message = this.sendMessage(topic, tag, key, object);
        try {
            return orderProducer.send(message, shardingKey);
        } catch (Exception e) {
            return null;
        }
    }

    /**
     * 发送顺序消息 发送失败-发送结果为 null
     *
     * @param topic       主题
     * @param tag         消息过滤标签
     * @param shardingKey 全局顺序消息，该字段可以设置为任意非空字符串。
     * @param object      消息内容
     * @return 发送结果
     */
    public SendResult sendOrderMessage(String topic, String tag, String shardingKey, Object object) {
        Message message = this.sendMessage(topic, tag, null, object);
        try {
            return orderProducer.send(message, shardingKey);
        } catch (Exception e) {
            return null;
        }
    }

    /**
     * 同步发送消息
     * SendResult 为空则消息发送失败
     * 如果消息发送失败，需要进行重试处理，可重新发送这条消息或持久化这条数据进行补偿处理。
     * 普通消息
     *
     * @param topic  主题
     * @param tag    标签
     * @param object 消息内容
     * @return 发送结果
     */
    public SendResult send(String topic, String tag, Object object) {
        Message message = this.sendMessage(topic, tag, null, object);
        return getSendResult(message);
    }

    /**
     * 同步发送消息
     *
     * @param message 消息载体
     *                body统一使用 hutool工具 JSONUtil.toJsonStr(object)
     * @return 发送结果
     */
    public SendResult send(Message message) {
        return getSendResult(message);
    }

    /**
     * 同步发送消息
     * SendResult 为空则消息发送失败
     * 如果消息发送失败，需要进行重试处理，可重新发送这条消息或持久化这条数据进行补偿处理。
     * 普通消息
     *
     * @param topic  消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-"和下划线"_"构成.
     * @param tag    消息标签, 请使用合法标识符, 尽量简短且见名知意
     * @param object 消息内容 消息内容长度默认不超过4M, 具体请参阅集群部署文档描述.
     */
    public SendResult send(String topic, String tag, String key, Object object) {
        Message message = this.sendMessage(topic, tag, key, object);
        return getSendResult(message);
    }

    /**
     * 异步发送消息
     * 普通消息
     * 如果消息发送失败，需要进行重试处理，可重新发送这条消息或持久化这条数据进行补偿处理。
     *
     * @param message      投递的消息
     * @param sendCallback 回调处理
     */
    public void sendAsync(Message message, SendCallback sendCallback) {
        producer.sendAsync(message, sendCallback);
    }

    /**
     * <p>
     * 异步发送消息
     * 如果消息发送失败，会进行重新发送
     * </p>
     *
     * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意
     * @param o   消息内容 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
     */
    public void sendAsync(String topic, String tag, Object o) {
        Message message = this.sendMessage(topic, tag, null, o);
        sendAsyncMsg(message);
    }

    /**
     * 异步发送消息,发送失败会进行重新发送
     *
     * @param topic 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-"和下划线"_"构成.
     * @param tag   消息标签, 请使用合法标识符, 尽量简短且见名知意
     * @param key   消息key
     * @param o     消息内容 消息内容长度默认不超过4M, 具体请参阅集群部署文档描述.
     */
    public void sendAsync(String topic, String tag, String key, Object o) {
        Message message = this.sendMessage(topic, tag, key, o);
        sendAsyncMsg(message);
    }

    private void sendAsyncMsg(Message message) {
        producer.sendAsync(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步消息发送成功,【Topic】:{}, 【MessageId】:{}", sendResult.getTopic(), sendResult.getMessageId());
            }

            @Override
            public void onException(OnExceptionContext context) {
                // 消息发送失败，可自定义进行处理
                log.error("异步消息发送失败，Topic】:{}, 【MessageId】:{}", context.getTopic(), context.getMessageId());
            }
        });
    }

    /**
     * 单向发送消息，不保证消息投递成功
     * 普通消息
     *
     * @param tag 消息标签
     * @param o   消息内容
     */
    public void sendOneWay(String topic, String tag, Object o) {
        producer.sendOneway(this.sendMessage(topic, tag, null, o));
    }

    /**
     * 单向发送消息，不保证消息投递成功
     * 普通消息
     *
     * @param topic 普通消息topic
     * @param tag   消息标签
     * @param o     消息内容
     */
    public void sendOneWay(String topic, String tag, String key, Object o) {
        producer.sendOneway(this.sendMessage(topic, tag, key, o));
    }

    /**
     * 发送定时/延时消息
     * 延时消息 最长时间不得超过40天
     * 同步发送消息，只要不抛异常就是成功。
     *
     * @param delayTime  延迟时间 System.currentTimeMillis() + (delayTime) 延时消息，时间戳，单位毫秒（ms）
     * @param delayTopic 延时消息topic
     * @param delayTag   延时消息tag
     * @param o          消息内容
     */
    public SendResult sendDelay(String delayTopic, String delayTag, Long delayTime, Object o) {
        checkDelayTime(delayTime);
        Message message = sendMessage(delayTopic, delayTag, null, o);
        message.setStartDeliverTime(delayTime);
        return getSendResult(message);
    }


    /**
     * 发送定时/延时消息
     * 延时消息 最长时间不得超过40天
     * 同步发送消息，只要不抛异常就是成功。
     *
     * @param day        延迟时间/天
     * @param delayTopic 延时消息topic
     * @param delayTag   延时消息tag
     * @param o          消息内容
     */
    public SendResult sendDelayDay(String delayTopic, String delayTag, Long day, Object o) {
        LocalDateTime delayLocalDateTime = LocalDateTime.now().plusDays(day);
        long delayTime = delayLocalDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
        checkDelayTime(delayTime);
        Message message = sendMessage(delayTopic, delayTag, null, o);
        message.setStartDeliverTime(delayTime);
        return getSendResult(message);
    }

    /**
     * 发送定时/延时消息
     * 延时消息 最长时间不得超过40天
     * 同步发送消息，只要不抛异常就是成功。
     *
     * @param delayTime  延迟时间 System.currentTimeMillis() + (delayTime) 延时消息，时间戳，单位毫秒（ms）
     * @param delayTopic 延时消息topic
     * @param delayTag   延时消息tag
     * @param key        代表消息的业务关键属性，请尽可能全局唯一。
     * @param o          消息内容
     */
    public SendResult sendDelay(String delayTopic, String delayTag, Long delayTime, String key, Object o) {
        checkDelayTime(delayTime);
        Message message = sendMessage(delayTopic, delayTag, key, o);
        message.setStartDeliverTime(delayTime);
        return getSendResult(message);
    }


    /**
     * 发送定时/延时消息
     * 延时消息 最长时间不得超过40天
     * 同步发送消息，只要不抛异常就是成功。
     *
     * @param day        延迟时间/天
     * @param delayTopic 延时消息topic
     * @param delayTag   延时消息tag
     * @param key        代表消息的业务关键属性，请尽可能全局唯一。
     * @param o          消息内容
     */
    public SendResult sendDelayDay(String delayTopic, String delayTag, Long day, String key, Object o) {
        LocalDateTime delayLocalDateTime = LocalDateTime.now().plusDays(day);
        long delayTime = delayLocalDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
        checkDelayTime(delayTime);
        Message message = sendMessage(delayTopic, delayTag, key, o);
        message.setStartDeliverTime(delayTime);
        return getSendResult(message);
    }

    /**
     * 发送消息并获得发送结果
     *
     * @param message 消息
     * @return 发送结果
     */
    private SendResult getSendResult(Message message) {
        SendResult send;
        try {
            send = producer.send(message);
        } catch (Exception e) {
            send = null;
            log.error("消息发送失败 message: {}", message);
        }
        return send;
    }

    /**
     * 构建消息
     *
     * @param topic  消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-"和下划线"_"构成.
     * @param tag    消息标签, 请使用合法标识符, 尽量简短且见名知意
     * @param key    代表消息的业务关键属性，请尽可能全局唯一。
     * @param object 消息内容 消息内容长度默认不超过4M, 具体请参阅集群部署文档描述.
     * @return Message
     */
    private Message sendMessage(String topic, String tag, String key, Object object) {
        byte[] bytes;
        if (isPrimitive(object)) {
            bytes = object.toString().getBytes(StandardCharsets.UTF_8);
        } else {
            bytes = JSONObject.toJSONString(object).getBytes();
        }
        return new Message(topic, tag, key, bytes);
    }

    /**
     * 判断是否为基础类型包装类
     * <pre>
     *     Long i = 3L
     *     isPrimitive(i) = true
     *     isPrimitive(new Object) = false
     * </pre>
     *
     * @param obj obj
     * @return boolean
     */
    private boolean isPrimitive(Object obj) {
        try {
            return ((Class<?>) obj.getClass().getField("TYPE").get(null)).isPrimitive();
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 校验定时/延时 发送时间是否超过最大限制时间
     *
     * @param delayTime 延迟时间
     */
    private static void checkDelayTime(Long delayTime) {
        LocalDateTime maxLimitTime = LocalDateTime.now().plusDays(40);
        final LocalDateTime sendTime = LocalDateTime.ofEpochSecond(delayTime / 1000, 0, ZoneOffset.ofHours(8));
        if (sendTime.isAfter(maxLimitTime)) {
            throw new ONSClientException("超出最大时间限制");
        }
    }


}
